Vodafone / O2 Dwell Data Analysis#
Setup#
Imports and Functions#
import duckdb
import geopandas as gpd
import pandas as pd
import plotly.express as px
from pathlib import Path
from getpass import getpass
import h3pandas
from shapely.geometry import box
from sklearn.cluster import DBSCAN
import numpy as np
import folium
pd.options.plotting.backend = "plotly" # set plotly as the default plotting engine for pandas
pd.set_option("future.no_silent_downcasting", True) # raise error when downcasting
def fetch_csv(url: str) -> pd.DataFrame:
"""Reads and parses a CSV using DuckDB, returning a Pandas DataFrame."""
query = f"SELECT * FROM read_csv_auto('{url}');"
with duckdb.connect(database=":memory:") as conn:
return conn.sql(query).fetchdf()
def process_pings(df: pd.DataFrame, check_data_quality: bool = False) -> gpd.GeoDataFrame:
"""Processes the GPS pings DataFrame, returning a GeoDataFrame."""
if check_data_quality:
dt_dtypes = ("datetime64[ns]", "<M8[us]")
assert df["datetime"].dtype in dt_dtypes, "Invalid datetime data type"
assert df.isna().sum().any() == False, "Missing data in DataFrame"
assert df.duplicated().sum() == 0, "Duplicate rows in DataFrame"
assert df["lat"].dtype == "float64", "Invalid latitude data type"
assert df["lon"].dtype == "float64", "Invalid longitude data type"
assert df["lat"].between(-90, 90).all(), "Latitude values outside of valid range"
assert df["lon"].between(-180, 180).all(), "Longitude values outside of valid range"
geometry = lambda df: gpd.points_from_xy(df["lon"], df["lat"], crs="EPSG:4326")
rename_cols = {"datetime": "pinged_at"}
drop_cols = ["lat", "lon"]
return (
df.assign(geometry=geometry)
.drop(columns=drop_cols)
.rename(columns=rename_cols)
.pipe(gpd.GeoDataFrame) # convert to GeoDataFrame
.sort_values(by=["user_id", "pinged_at"], ignore_index=True)
)
Process/Load Data#
url = getpass()
out_path = Path("../data/gps.parquet.gzip")
if not out_path.exists():
out_path.parent.mkdir(exist_ok=True)
df = fetch_csv(url)
df.pipe(process_pings, check_data_quality=True).to_parquet(out_path, compression="gzip")
del df
print(f"{out_path}: {out_path.stat().st_size / 1e6:.2f} MB (compressed)")
gdf = gpd.read_parquet(out_path)
../data/gps.parquet.gzip: 33.44 MB (compressed)
Exploratory Data Analysis#
gdf.head(3)
| user_id | pinged_at | geometry | |
|---|---|---|---|
| 0 | 0001347E-B3AD-4B84-80B9-14312A5CDBF4 | 2018-01-12 13:33:17 | POINT (-0.11597 51.49539) |
| 1 | 0001347E-B3AD-4B84-80B9-14312A5CDBF4 | 2018-01-12 13:38:22 | POINT (-0.10594 51.49857) |
| 2 | 0001347E-B3AD-4B84-80B9-14312A5CDBF4 | 2018-01-12 13:43:24 | POINT (-0.09604 51.49963) |
gdf["pinged_at"].describe().to_frame().T
| count | mean | min | 25% | 50% | 75% | max | |
|---|---|---|---|---|---|---|---|
| pinged_at | 1770011 | 2018-01-14 03:25:27.649975 | 2018-01-08 00:00:20 | 2018-01-10 16:56:02 | 2018-01-15 04:41:03 | 2018-01-17 15:44:51 | 2018-01-19 23:59:41 |
users = gdf["user_id"].nunique()
pings = gdf.shape[0]
print(f"The data contains {pings:,} pings for {users:,} unique users.")
The data contains 1,770,011 pings for 31,606 unique users.
Pings Per User#
user_pings = (
gdf.groupby("user_id", as_index=False)
.size()
.rename(columns={"size": "pings"})
.sort_values(by="pings", ascending=False)
)
user_pings.describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.8, 0.95, 0.99, 0.999]).round(2).T
| count | mean | std | min | 10% | 25% | 50% | 75% | 80% | 95% | 99% | 99.9% | max | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| pings | 31606.0 | 56.0 | 57.15 | 20.0 | 22.0 | 27.0 | 39.0 | 65.0 | 74.0 | 145.0 | 257.0 | 548.37 | 3430.0 |
fig = px.ecdf(
user_pings,
x="pings",
title="ECDF of User Pings (aka x% of users have at least n pings)",
)
# usually ecdf is reported probability density, for readability report as %
fig.update_layout(
xaxis_title="Number of Pings",
yaxis_title="% of Users",
yaxis_tickformat=".0%",
showlegend=False,
)
fig.show()
Pings Per Day#
grouper = [pd.Grouper(key="pinged_at", freq="D")] # to group by day while avoiding missing dates
ping_dates = (
gdf.groupby(grouper, as_index=False)
.size()
.rename(columns={"size": "pings"})
.sort_values(by="pinged_at")
.assign(
day_name=lambda df: df["pinged_at"].dt.day_name(),
week_number=lambda df: df["pinged_at"].dt.isocalendar().week,
)
)
ping_dates.plot(x="pinged_at", y="pings", title="Daily Pings")
Week over Week Comparison#
ping_dates.query("pings > 0").plot(x="day_name", y="pings", color="week_number", title="Daily Pings")
Examine Spatial Extent & Macro Cluster Locations#
Here we see the overall spatial extent of the data, as well as the locations of point of interest (POI) clusters. The POI clusters are determined by the DBSCAN algorithm, which groups points that are within a certain distance of each other.
We can see that key tube and train stations and high vehicular traffic areas have formed the key clusters.
# two layers, a bounding box of the spatial extent (of all data) and some randomly sampled points
total_spatial_extent = gpd.GeoDataFrame(geometry=[box(*gdf.total_bounds)], crs=gdf.crs)
sampled_points = gdf.sample(10_000, random_state=42).drop(columns=["user_id", "pinged_at"])
# cluster the sampled points to identify areas of high density (i.e. find hotspots)
# this is macro level clustering, without considering time or each user's individual patterns
kms_per_radian = 6371.0088 # radius of the Earth in kilometers
distance_km = 1
epsilon = distance_km / kms_per_radian # convert to radians
dbscan = DBSCAN(eps=epsilon, min_samples=20, metric="haversine", algorithm="ball_tree")
coordinates = np.column_stack((sampled_points["geometry"].y, sampled_points["geometry"].x))
sampled_points["cluster"] = dbscan.fit_predict(coordinates)
# create a map
m = sampled_points.explore(
name="Sampled Points", column="cluster", categorical=True, legend=False # too many clusters to be useful
)
total_spatial_extent.explore(m=m, name="Total Spatial Extent", color="grey")
folium.LayerControl().add_to(m)
m
Make this Notebook Trusted to load map: File -> Trust Notebook
Calculate Dwell Times#
First step is to group user ‘journeys’ by using sensible (time and distance) cutoffs between subsequent pings. In other words, grouping pings that are close in time and space into a single ‘journey’.
def calculate_dwell_attributes(
gdf: gpd.GeoDataFrame, distance_threshold: float = 600, time_threshold: float = 600
) -> gpd.GeoDataFrame:
"""
Identifies dwells in a GeoDataFrame by comparing consecutive points against distance and time thresholds.
Adds columns for distance/time differences, dwell flag, and dwell IDs.
Parameters:
- gdf: GeoDataFrame with 'geometry' and 'pinged_at' columns.
- distance_threshold: Distance limit in metres to define a dwell.
- time_threshold: Time limit in seconds to define a dwell.
- h3_res: H3 Index resolution, default 8.
Returns: GeoDataFrame with added dwell-related columns.
"""
return gdf.assign(
lag_distance_diff=lambda df: df["geometry"].distance(df["geometry"].shift()),
lag_time_diff=lambda df: df["pinged_at"].diff().dt.total_seconds(),
is_potential_dwell=lambda df: (
# apply small threshold to remove consecutive pings with no movement / time difference
(df["lag_time_diff"] > 0.01)
& (df["lag_distance_diff"] > 0.01)
# discretise time and distance differences to identify potential dwells
& (df["lag_distance_diff"] < distance_threshold)
& (df["lag_time_diff"] < time_threshold)
),
dwell_id=lambda df: (~df["is_potential_dwell"]).cumsum(),
)
h3_resolution = 10
h3_col_name = f"h3_{h3_resolution:02d}"
cols = gdf.columns.to_list() + [h3_col_name]
dwells = ( # gdf already sorted by user_id and pinged_at
gdf.h3.geo_to_h3(resolution=h3_resolution, set_index=False)
.to_crs("EPSG:27700") # use British National Grid for distance calculations (metres)
.groupby(["user_id"], group_keys=False, sort=False, as_index=False)[cols]
.apply(calculate_dwell_attributes)
.groupby(["user_id", "dwell_id", h3_col_name], as_index=False)
.agg(
dwell_start=("pinged_at", "min"),
dwell_end=("pinged_at", "max"),
dwell_pings=("pinged_at", "count"),
geometries=("geometry", list),
)
.query("dwell_pings > 1") # at least two pings (per 'user-journey') to define a dwell within a single H3 Index
.assign(dwell_duration_seconds=lambda df: (df["dwell_end"] - df["dwell_start"]).dt.total_seconds())
)
Distribution of pings per dwell
dwells["dwell_pings"].round(-1).value_counts().to_frame().sort_index()
| count | |
|---|---|
| dwell_pings | |
| 0 | 170309 |
| 10 | 1718 |
| 20 | 105 |
| 30 | 29 |
| 40 | 14 |
| 50 | 4 |
| 60 | 3 |
| 70 | 3 |
| 80 | 4 |
| 90 | 1 |
| 100 | 1 |
| 110 | 4 |
| 140 | 2 |
| 150 | 2 |
| 260 | 1 |
| 350 | 1 |
| 430 | 1 |
dwells.query("dwell_pings == dwell_pings.max()")
| user_id | dwell_id | h3_10 | dwell_start | dwell_end | dwell_pings | geometries | dwell_duration_seconds | |
|---|---|---|---|---|---|---|---|---|
| 406827 | 435771D7-6329-4664-8C4A-C1366A1CDDB0 | 52 | 8a194ad148e7fff | 2018-01-09 23:33:45 | 2018-01-10 04:44:43 | 431 | [POINT (530981.4749883055 179168.01740387268),... | 18658.0 |
Visualise Dwells#
Single user dwell visualisation (most pinged user)
(
dwells.query('user_id == "435771D7-6329-4664-8C4A-C1366A1CDDB0"')
.explode(column="geometries")
.pipe(gpd.GeoDataFrame)
.set_geometry("geometries", crs="EPSG:27700")
.assign(dwell_start=lambda df: df["dwell_start"].astype(str), dwell_end=lambda df: df["dwell_end"].astype(str))
.explore(column="dwell_id", categorical=True)
)
Make this Notebook Trusted to load map: File -> Trust Notebook
High Dwell Ping Count Visualisation
(
dwells.query("dwell_pings > 6")
.explode(column="geometries")
.pipe(gpd.GeoDataFrame)
.set_geometry("geometries", crs="EPSG:27700")
.drop(columns=["dwell_start", "dwell_end"])
.explore(column="dwell_id", categorical=True, legend=False)
)
Make this Notebook Trusted to load map: File -> Trust Notebook
Average Dwell Times Across H3 Cells#
h3_avg_dwells = (
dwells.groupby([h3_col_name], as_index=True)
.agg(avg_dwell_duration_seconds=("dwell_duration_seconds", "mean"))
.h3.h3_to_geo_boundary()
)
h3_avg_dwells.query("avg_dwell_duration_seconds > 100").explore(
column="avg_dwell_duration_seconds", scheme="NaturalBreaks"
)
Make this Notebook Trusted to load map: File -> Trust Notebook